

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.recommendation.{Rating, MatrixFactorizationModel, ALS}
import org.apache.spark.rdd.RDD
import util.{RDDOperation, ScopeCaculateUtil, GlobalArgs}

/**
  * Created by chenjianwen on 2016/3/2.
  */
object Main {

  val BASE_SCORE = 5

  val THREE_MONTH = 3*30*24*3600
  val SIX_MONTH = 6*30*24*3600
  val NINE_MONTH = 9*30*24*3600
  val currentTimeInSec = System.currentTimeMillis()/1000;



  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val orderDataHandler = new YCFOrderDataHandler(GlobalArgs.order_path)
    val actDataToHandler = new YCFActionDataHandler(GlobalArgs.act_path)


    val orderRDD = orderDataHandler.ratingRdd(ScopeCaculateUtil.time2Score)
    val actRdd = actDataToHandler.ratingRdd()

    println(s"订单数据 总量   ${orderRDD.count()}")
    println(s"行为数据总量    ${actRdd.count()}")

    //相加两个RDD的评分
    val terminateRDD = RDDOperation.addActAndOrderScope(orderRDD,actRdd)
    //val actRdd = actDataToHandler.rdd

    println(s"合并之后的数据量：${terminateRDD.count()}")
    //组合参数 训练，得到方差最小
    val ranks = Array(4,5,6,7)
    val lambdas = Array(0.2,0.1,0.01,0.005,0.0025)
    val iterationNum = 12


    //训练三个数据
    val beTrain1 = terminateRDD.filter(x=>x._1<=5).map(x=>x._2)
    val beTrain2 = terminateRDD.filter(x=>x._1>5).map(x=>x._2)
    val beTrain3_all = terminateRDD.map(_._2)

    //训练模型，得到误差最小的模型
    println(s"开始训练，数据量：${beTrain3_all.count()}")
    var minRMSD:Double = -1
    var temp:Double = 0
    var bestModel:Any = null
    for(rank <- ranks;lambda<-lambdas){
      val model = ALS.train(beTrain3_all,rank,iterationNum,lambda)
      temp = computeRMSD(model,beTrain2)
      if(minRMSD<0||minRMSD>=temp){
        minRMSD = temp
        bestModel = model
      }
      println(s"rank:${rank},lambda:${lambda},均方根误差:${temp}")
    }
  }



  /**
    *计算均方误差，也可以使用 var rs = new RegressionMetrics(RDD[(Double,Double)]); rs.rootMeanSquaredError();rs.meanSquaredError();
 *
    * @param model 模型
    * @param realdata 真实数据
    * @return 该模型预测的数据与真实数据的均方根误差
    */
  def computeRMSD(model:MatrixFactorizationModel,realdata:RDD[Rating]): Double ={

    //根据模型预测
    val prediction = realdata.map(x=>(x.user,x.product))
    val realDataToTpule:RDD[((Int,Int),Double)] = realdata.map(rating=>{((rating.user,rating.product),rating.rating)})
    val predictionAndRealData:RDD[((Int,Int),(Double,Double))] = model.predict(prediction).cache().map(rating=>{((rating.user,rating.product),rating.rating)}).join(realDataToTpule)
    val dictianceSum = predictionAndRealData.map(x=>{(Math.pow(x._2._1-x._2._2,2),1)}).reduce((a,b)=>{((a._1+b._1),(a._2+b._2))})

    return Math.sqrt(dictianceSum._1/dictianceSum._2)
  }


}
